package org.budo.warehouse.logic.util;

import java.util.ArrayList;
import java.util.List;

import org.budo.warehouse.logic.api.DataEntry;
import org.budo.warehouse.logic.api.DataEntryPojo;
import org.budo.warehouse.logic.api.DataEntryPojo.Row;
import org.budo.warehouse.logic.api.DataMessage;
import org.budo.warehouse.logic.api.DataMessagePojo;
import org.budo.warehouse.service.entity.EntryBuffer;
import org.budo.warehouse.service.entity.Pipeline;

/**
 * @author limingwei
 */
public class EntryBufferLogicUtil {
    public static List<EntryBuffer> messageToBuffers(DataMessage dataMessage, Pipeline pipeline) {
        return messageToBuffers(dataMessage, pipeline, false);
    }

    public static List<EntryBuffer> messageToBuffers(DataMessage dataMessage, Pipeline pipeline, Boolean simplify) {
        List<DataEntry> dataEntries = dataMessage.getDataEntries();

        List<EntryBuffer> entryBuffers = new ArrayList<EntryBuffer>();

        for (DataEntry dataEntry : dataEntries) {
            DataEntryPojo dataEntryPojo = dataEntry instanceof DataEntryPojo //
                    ? (DataEntryPojo) dataEntry //
                    : new DataEntryPojo(dataEntry);

            List<Row> rows = dataEntryPojo.getRows();
            String rowsJson = DataEntryUtil.rowsToJson(rows, simplify);

            EntryBuffer entryBuffer = new EntryBuffer() //
                    .setPipelineId(null == pipeline ? null : pipeline.getId()) //
                    .setEventType(dataEntry.getEventType()) //
                    .setSchemaName(dataEntry.getSchemaName()) //
                    .setTableName(dataEntry.getTableName()) //
                    .setSql(dataEntry.getSql()) //
                    .setRows(rowsJson);

            entryBuffers.add(entryBuffer);
        }

        return entryBuffers;
    }

    public static DataMessage buffersToMessage(List<EntryBuffer> entryBuffers, Integer dataNodeId) {
        List<DataEntry> dataEntries = new ArrayList<DataEntry>();

        for (EntryBuffer entryBuffer : entryBuffers) {
            String rowsJson = entryBuffer.getRows();
            List<Row> rows = DataEntryUtil.jsonToRows(rowsJson);

            DataEntryPojo dataEntry = new DataEntryPojo() //
                    .setEventType(entryBuffer.getEventType()) //
                    .setSchemaName(entryBuffer.getSchemaName()) //
                    .setTableName(entryBuffer.getTableName()) //
                    .setSql(entryBuffer.getSql()) //
                    .setRows(rows);
            dataEntries.add(dataEntry);
        }

        return new DataMessagePojo(null, dataNodeId, dataEntries);
    }
}